Análisis de cambio en temperaturas a nivel global por ciudades

Versión preliminar

Descripción

En este trabajo se utilizan dos conjuntos de datos para hacer un análisis del cambio de temperatura a nivel global por ciudades. El primer conjunto de datos tiene datos de temperatura desde 1975 por ciudades del mundo y el segundo conjunto de datos son datos adicionales de ciudades que incluyen su elevación en metros sobre el nivel del mar.

El objetivo es hacer el cruce de dos conjuntos de datos para ver la relación entre la variación de temperatura y las ciudades según su altitud.

Obtendremos gráficas representativas de variables como la relación entre incremento de temperatura y tiempo, Países con mayor incremento de temperatura, relación entre el incremento de la temperatura y la altitud de las ciudades.

También se utilizará la regresión lineal para obtener un modelo de predicción de la variación de temperatura para los siguientes 50 años, también un modelo de predicción similar pero relacionado a la relación entre el incremento de temperatura y la altitud de las ciudades.

Como se trata de un conjunto de datos grande se aprovechará una infraestructura de clúster spark con un maestro y dos workers para ejecutar tareas distribuidas, las tareas que se ejecutarán de forma distribuida son:

  • Cargado de los conjuntos de datos en dataframes spark.
  • Tratamiento de datos nulos evaluando si aplicar eliminación o alguna técnica para compensar la falta de datos.
  • Mezclado de ambos dataframes en uno solo.
  • Aplicación de funciones UDF para ayudar con las tareas de limpieza y procesamiento.
  • Aplicación de la técnica de regresión lineal simple o regresión lineal polinomial según corresponda para obtener el modelo de predicción.
  • Guardado del dataframe procesado en archivos parquet.
In [1]:
import os
memory = '5g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MLApp2").getOrCreate()
from pyspark.ml.linalg import Vectors
import pandas as pd
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
In [3]:
sc = spark.sparkContext 
In [4]:
!ls
cities-altitude-sparql.csv    elevationsByCity-2M.csv
eda-1000000_v2.ipynb	      elevationsByCity.csv
eda-1000000_v3-Copy1.ipynb    GlobalLandTemperaturesByCity.1000000.csv
eda-1000000_v3.ipynb	      GlobalLandTemperaturesByCity-2M.csv
eda-v1.ipynb		      GlobalLandTemperaturesByCity.csv
eda_v3-4M.ipynb		      MPIData_augmented.csv
eda_v3.ipynb		      spark-warehouse
eda_v4.ipynb		      temp-plot.html
elevationsByCity.1000000.csv  Untitled.ipynb
In [5]:
#dtf1 = spark.read.csv('hdfs:///datasets/GlobalLandTemperaturesByCity.1000000.csv',
dtf1 = spark.read.csv('./GlobalLandTemperaturesByCity.csv',
                       inferSchema=True, 
                       header=True)
In [6]:
dtf1.show(10)
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01 00:00:00|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01 00:00:00|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01 00:00:00|14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|
|1744-07-01 00:00:00|            16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|
|1744-08-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 10 rows

In [7]:
type(dtf1)
Out[7]:
pyspark.sql.dataframe.DataFrame
In [8]:
print(dtf1.count())
8599212

EDA

Dataset Temperatura Promedio por ciudades

Este dataset contiene registros de temperatura promedio por día en ciudades del mundo, también tiene datos de de psocionamiento geográficos como latitud y longitud.

Se empieza analizando la ausencia de valores, haciendo ajustes en los tipos de datos esto por que algunas operaciones posteriores requieren que las variables estén en un tipo de dato estándar de spark.

In [9]:
dtf1.printSchema()
root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [10]:
# Comprobando nulos en AverageTemperature
from pyspark.sql.functions import isnan, when, count, col
dtf1.where(col('AverageTemperature').isNull()).count()
Out[10]:
364130
In [11]:
import pyspark.sql.functions as f
dtf1.where(col('AverageTemperature').isNull() & (col('Country') == 'Denmark')).show(5)
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-08-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows

Como existen varios registros con ausencia de valor y existe un dataset con las elevaciones por latitud y longitud que tiene la misma cantidad de filas, para evitar inconsistencias se unirán ambos datasets y posteriormente se procederá a hacer el tratamiento de valores ausentes.

In [12]:
# convirtiendo a cadena la columna timestamps
from pyspark.sql.types import IntegerType, StringType, DoubleType
from pyspark.sql.functions import udf

# registrando UDF para conversion
def timestampsToString(dt):
    return str(dt) + ''

timestampToString_udf = udf(lambda z: timestampsToString(z), StringType())
In [13]:
dtf1.select('dt', timestampToString_udf('dt').alias('dt_str')).show(5)
+-------------------+-------------------+
|                 dt|             dt_str|
+-------------------+-------------------+
|1743-11-01 00:00:00|1743-11-01 00:00:00|
|1743-12-01 00:00:00|1743-12-01 00:00:00|
|1744-01-01 00:00:00|1744-01-01 00:00:00|
|1744-02-01 00:00:00|1744-02-01 00:00:00|
|1744-03-01 00:00:00|1744-03-01 00:00:00|
+-------------------+-------------------+
only showing top 5 rows

In [14]:
# aplicando udf y reemplazando columna a tipo de dato String
dtf1 = dtf1.withColumn('dt', timestampToString_udf('dt'))
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

Dataset Elevaciones (metros sobre el nivel del mar)

El dataset 2 contiene datos de elevación (altitud sobre el nivel del mar) de los puntos cardinales de las ciudades del dataset 1.

Se ha extraído de la base de datos http://srtm.csi.cgiar.org usando la herramienta https://github.com/Jorl17/open-elevation con el script elevation-getter.py que consulta a un servicios web de una instancia de open-elevation, sen envían como parámetros los puntos cardinales (latitud, longitud) de cada registro del dataset 1 y se obtiene la elevación en metros sobre el nivel del mar.

In [15]:
## Cargando dataset elevationsByCity.csv
#dtf2 = spark.read.csv('hdfs:///datasets/elevationsByCity.1000000.csv',
# dtf2 = spark.read.csv('./elevationsByCity-2M.csv',
dtf2 = spark.read.csv('./elevationsByCity.csv',
                       inferSchema=True, 
                       header=True)
In [16]:
dtf2.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [17]:
dtf2Len = dtf2.count()
print(dtf2Len)
8599212
In [18]:
# valores marcados como erroneos
missingElevation = dtf2.filter(dtf2.Elevation == -5555)
missingElevation.show()
+--------+---------+---------+
|Latitude|Longitude|Elevation|
+--------+---------+---------+
|  21.70N|   77.02E|    -5555|
|  36.17N|  139.23E|    -5555|
|  50.63N|   13.94E|    -5555|
|  13.66N|  101.56E|    -5555|
|   7.23N|    5.68E|    -5555|
|  57.05N|   30.98E|    -5555|
|  36.17N|  119.34E|    -5555|
|  40.99S|  174.67E|    -5555|
|  39.38N|  104.05W|    -5555|
|  36.17N|  109.39E|    -5555|
+--------+---------+---------+

In [19]:
# teniendo las latitudes y longitudes con elevacion errónea, la forma de corregir es
# copiando la elevación de un punto que tenga las mismas coordenadas geográficas.
lMissing = missingElevation.collect()
In [20]:
elevations = {}
for lm in lMissing:
    temp = dtf2.filter((dtf2.Latitude == lm["Latitude"]) & (dtf2.Longitude == lm["Longitude"])).head(5)
    elevation = 0
    for i in range(0,5):
        if temp[i][2] != -5555:
            elevation = temp[i][2]
            break
    elevations[temp[1][0]] = elevation
    # elevations.append((temp[1][0], temp[1][1], elevation))
print(elevations)
{'21.70N': 336, '36.17N': 1144, '50.63N': 285, '13.66N': 25, '7.23N': 263, '57.05N': 79, '40.99S': 0, '39.38N': 1761}
In [21]:
def replaceMissingElevation(latitude, elevation):
    if elevation == -5555:
        return elevations[latitude]
    return elevation
replaceMissingElevation_udf = udf(replaceMissingElevation, IntegerType())
In [22]:
# aplicando correccion
dfe = dtf2.withColumn("Elevation", replaceMissingElevation_udf(dtf2['Latitude'], dtf2['Elevation']))
dfe.show(3)
print(dfe.count())
elevations = []
+--------+---------+---------+
|Latitude|Longitude|Elevation|
+--------+---------+---------+
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
+--------+---------+---------+
only showing top 3 rows

8599212
In [23]:
# comprobando
print("total registros:", dfe.count())
print("Registros erróneos:", dfe.filter(dfe.Elevation == -5555).count())
total registros: 8599212
Registros erróneos: 0
In [24]:
dfe.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [25]:
dfe.filter(dfe.Elevation > 2500).count()
Out[25]:
74887
In [26]:
# Comprobando ausencia de valores en latitud y longitud
print(dfe.columns)
print(dtf1.columns)

print("Nulls in dfe")
dfe.select([count(when(isnan(c), c)).alias(c) for c in ['Latitude', 'Longitude']]).show()
print("Nulls in dtf1")
dtf1.select([count(when(isnan(c), c)).alias(c) for c in ['Latitude', 'Longitude']]).show()
['Latitude', 'Longitude', 'Elevation']
['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude']
Nulls in dfe
+--------+---------+
|Latitude|Longitude|
+--------+---------+
|       0|        0|
+--------+---------+

Nulls in dtf1
+--------+---------+
|Latitude|Longitude|
+--------+---------+
|       0|        0|
+--------+---------+

In [27]:
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [28]:
dfe.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [29]:
# Comprobando que latitud y longitud sean consistentes en ambos datasets
dtf1.select("Latitude", "Longitude").exceptAll(dfe.select("Latitude", "Longitude")).count()
Out[29]:
0
In [30]:
dtf1.select([count(when(isnan(c), c)).alias(c) for c in ['AverageTemperature','AverageTemperatureUncertainty']]).show()
+------------------+-----------------------------+
|AverageTemperature|AverageTemperatureUncertainty|
+------------------+-----------------------------+
|                 0|                            0|
+------------------+-----------------------------+

In [31]:
# ocurren algunos errores al unir los dataframes con join por columnas 
# latitud y longitud, entonces se crea una columna nueva id para 
# evitar errores al mezclar los dataframes.
In [32]:
from pyspark.sql import functions as F
from pyspark.sql import Window
# Agregando una columna indice para obtener por rangos
window = Window.orderBy(F.col('Latitude'), F.col('Longitude') )
dfe = dfe.withColumn('id', F.row_number().over(window))
dfe.show(10)
print(dfe.agg({"id": "max"}).collect()[0])
#dfeLen = dfe.count()
#print("count", dfeLen)
# window = Window.orderBy(F.col('Latitude'), F.col('Longitude'))
dtf1 = dtf1.withColumn('id', F.row_number().over(window))
dtf1.show(10)
print(dtf1.agg({"id": "max"}).collect()[0])
#dtf1Len = dtf1.count()
#print("count", dtf1Len)
+--------+---------+---------+---+
|Latitude|Longitude|Elevation| id|
+--------+---------+---------+---+
|   0.80N|  103.66E|        0|  1|
|   0.80N|  103.66E|        0|  2|
|   0.80N|  103.66E|        0|  3|
|   0.80N|  103.66E|        0|  4|
|   0.80N|  103.66E|        0|  5|
|   0.80N|  103.66E|        0|  6|
|   0.80N|  103.66E|        0|  7|
|   0.80N|  103.66E|        0|  8|
|   0.80N|  103.66E|        0|  9|
|   0.80N|  103.66E|        0| 10|
+--------+---------+---------+---+
only showing top 10 rows

Row(max(id)=8599212)
+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|       City| Country|Latitude|Longitude| id|
+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---+
|1825-01-01 00:00:00|25.331999999999997|                        3.194|Johor Bahru|Malaysia|   0.80N|  103.66E|  1|
|1825-02-01 00:00:00|25.549000000000003|           1.4709999999999999|Johor Bahru|Malaysia|   0.80N|  103.66E|  2|
|1825-03-01 00:00:00|            26.285|                        2.193|Johor Bahru|Malaysia|   0.80N|  103.66E|  3|
|1825-04-01 00:00:00|            26.999|                        2.571|Johor Bahru|Malaysia|   0.80N|  103.66E|  4|
|1825-05-01 00:00:00|27.450000000000006|                        1.591|Johor Bahru|Malaysia|   0.80N|  103.66E|  5|
|1825-06-01 00:00:00|27.732000000000006|           0.9009999999999999|Johor Bahru|Malaysia|   0.80N|  103.66E|  6|
|1825-07-01 00:00:00|            27.765|                         3.14|Johor Bahru|Malaysia|   0.80N|  103.66E|  7|
|1825-08-01 00:00:00|            26.399|                        2.432|Johor Bahru|Malaysia|   0.80N|  103.66E|  8|
|1825-09-01 00:00:00|            26.143|                        1.976|Johor Bahru|Malaysia|   0.80N|  103.66E|  9|
|1825-10-01 00:00:00|26.040000000000006|                        2.577|Johor Bahru|Malaysia|   0.80N|  103.66E| 10|
+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---+
only showing top 10 rows

Row(max(id)=8599212)
In [33]:
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- id: integer (nullable = true)

In [34]:
dfe.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)
 |-- id: integer (nullable = true)

In [35]:
temp1 = dfe.select("id", "Elevation")
temp1.show(5)
+---+---------+
| id|Elevation|
+---+---------+
|  1|        0|
|  2|        0|
|  3|        0|
|  4|        0|
|  5|        0|
+---+---------+
only showing top 5 rows

In [37]:
#df1.alias("a").join(
#    df2.alias("b"), df1['id'] == df2['id']
#).select("a.id", "a.val1", "b.val2").show()
#ndf = dtf1.alias("a").join(dfe.alias("b"), dtf1["id"]==dfe["id"])\
#    .select("dt", "AverageTemperature", "AverageTemperatureUncertainty", \
#            "City", "Country", "Latitude", "Longitude", "Elevation")
#ndf = dtf1.join(dfe, on=["id"], how="inner")
ndf = dtf1.join(temp1, on=["id"], how="inner")
ndf.show()
print(ndf.count())
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
| id|                 dt|AverageTemperature|AverageTemperatureUncertainty|       City| Country|Latitude|Longitude|Elevation|
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
|  1|1825-01-01 00:00:00|25.331999999999997|                        3.194|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  2|1825-02-01 00:00:00|25.549000000000003|           1.4709999999999999|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  3|1825-03-01 00:00:00|            26.285|                        2.193|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  4|1825-04-01 00:00:00|            26.999|                        2.571|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  5|1825-05-01 00:00:00|27.450000000000006|                        1.591|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  6|1825-06-01 00:00:00|27.732000000000006|           0.9009999999999999|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  7|1825-07-01 00:00:00|            27.765|                         3.14|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  8|1825-08-01 00:00:00|            26.399|                        2.432|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  9|1825-09-01 00:00:00|            26.143|                        1.976|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 10|1825-10-01 00:00:00|26.040000000000006|                        2.577|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 11|1825-11-01 00:00:00|26.249000000000002|                        2.025|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 12|1825-12-01 00:00:00|            25.161|                         1.47|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 13|1826-01-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 14|1826-02-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 15|1826-03-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 16|1826-04-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 17|1826-05-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 18|1826-06-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 19|1826-07-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
| 20|1826-08-01 00:00:00|              null|                         null|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
only showing top 20 rows

8599212
In [38]:
print(ndf.count())
8599212
In [39]:
ndf.columns
Out[39]:
['id',
 'dt',
 'AverageTemperature',
 'AverageTemperatureUncertainty',
 'City',
 'Country',
 'Latitude',
 'Longitude',
 'Elevation']

Tratamiento de ausencia de valores de temperatura

Para la columna AverageTemperature (temperatura promedio por día) haciendo una generalización y para evitar la estimaciones erróneas se ha optado por eliminar las filas con temperatura promedio sin valor. Esto por que la ausencia de valores se parece dar en una proporción pequeña de casos menor al 5%.

Por otro no es seguro intentar estimar o rellenar la ausencia de este dato reemplazando por un valor promedio ya que esto implicaría ver los casos particulares por ciudades y si existe una minoría de registros que no tengan el dato de temperatura promedio y esto podría representar mas trabajo a detalle que podría no tener relevancia en el resumen final de datos.

In [40]:
ndf.filter(ndf.AverageTemperature.isNull()).count()
Out[40]:
364130
In [41]:
# Eliminando las filas con dato de temperatura promedio 
ndf = ndf.filter(ndf.AverageTemperature.isNotNull())
In [42]:
ndf.show(5)
print('count:', ndf.count())
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
| id|                 dt|AverageTemperature|AverageTemperatureUncertainty|       City| Country|Latitude|Longitude|Elevation|
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
|  1|1825-01-01 00:00:00|25.331999999999997|                        3.194|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  2|1825-02-01 00:00:00|25.549000000000003|           1.4709999999999999|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  3|1825-03-01 00:00:00|            26.285|                        2.193|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  4|1825-04-01 00:00:00|            26.999|                        2.571|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  5|1825-05-01 00:00:00|27.450000000000006|                        1.591|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
only showing top 5 rows

count: 8235082

La columna AverageTemperatureUncertainty indica un valor de incertidumbre para la temperatura promedio con un intervalo de confianza de 95%, en los casos en que no se tiene este valor en un registro (columna) se lo rellena con la mediana global a modo de generalizar el error.

In [44]:
# from pyspark.sql.functions import approxQuantile
med = ndf.approxQuantile("AverageTemperatureUncertainty", [0.5], 0.25)
print(med)
AverageTemperatureUncertainty_median = med[0]
[0.539]
In [45]:
# reemplazando ausencia de valores con la mediana
replaceNullWithMedian_udf = udf(lambda temp: AverageTemperatureUncertainty_median if temp is None else temp, DoubleType())
ndf = ndf.withColumn("AverageTemperatureUncertainty", replaceNullWithMedian_udf(ndf['AverageTemperatureUncertainty']))
ndf.show(5)
ndf.count()
#ndf.filter(ndf.AverageTemperatureUncertainty.isNull()).count()
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
| id|                 dt|AverageTemperature|AverageTemperatureUncertainty|       City| Country|Latitude|Longitude|Elevation|
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
|  1|1825-01-01 00:00:00|25.331999999999997|                        3.194|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  2|1825-02-01 00:00:00|25.549000000000003|           1.4709999999999999|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  3|1825-03-01 00:00:00|            26.285|                        2.193|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  4|1825-04-01 00:00:00|            26.999|                        2.571|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
|  5|1825-05-01 00:00:00|27.450000000000006|                        1.591|Johor Bahru|Malaysia|   0.80N|  103.66E|        0|
+---+-------------------+------------------+-----------------------------+-----------+--------+--------+---------+---------+
only showing top 5 rows

Out[45]:
8235082
In [46]:
ndf.printSchema()
root
 |-- id: integer (nullable = true)
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [48]:
# Comprobando que no hayan nulls
ndf.filter(ndf.AverageTemperature.isNull()).count()
Out[48]:
0
In [49]:
ndf.write.parquet("ndf.parquet")

Ya se tiene guardado en el archivo ndf.parquet el dataset sin ausencia de valores y con el dato de elevación en metro sobre el nivel del mar por cada ciudad.

Las gráficas y generación de modelos de predicción se ven en los siguientes notebooks: